package priv.pfz.paxos.network;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import priv.pfz.paxos.common.EventQueue;
import priv.pfz.paxos.network.dto.AcceptReq;
import priv.pfz.paxos.network.dto.AcceptResp;
import priv.pfz.paxos.network.dto.PrepareReq;
import priv.pfz.paxos.network.dto.PrepareResp;
import priv.pfz.paxos.role.Acceptor;
import priv.pfz.paxos.role.Proposer;

import java.util.function.BiConsumer;
import java.util.function.Function;

import static priv.pfz.paxos.role.Events.*;

/**
 * 丢包可通过超时解决，这里简化为request或response为null
 * 不考虑重发，消息重发可通过简单的幂等处理解决，在此不再模拟
 * @author pengfangzhou
 * @date 2022/2/3 18:21
 */
@Slf4j
public class RpcManager {
    /**
     * 最小延迟ms
     */
    @Setter
    private int minLatency;

    /**
     * 最大延迟ms
     */
    @Setter
    private int maxLatency;

    /**
     * 丢包率
     */
    @Setter
    private int lostRate;

    /**
     * 网络通信事件（req到达/resp到达）的延迟队列
     */
    private final EventQueue eventQueue = new EventQueue();

    private int getLatency() {
        return RandomUtils.nextInt(minLatency, maxLatency);
    }

    private boolean isLost() {
        return RandomUtils.nextInt() % 100 < lostRate;
    }

    private <T, K> void asyncCall(Proposer proposer, Acceptor acceptor, T req,
                                         String reqAction, Function<T, K> reqHandler,
                                         String respAction, BiConsumer<T, K> respHandler) {
        //log.debug("{} send {} to {}: {}", proposer.getProposerId(), reqAction, acceptor.getAcceptorId(), req);
        int reqLatency = getLatency();
        eventQueue.add(() -> {
            //log.debug("{} receive {} from {}: {}", acceptor.getAcceptorId(), reqAction, proposer.getProposerId(), req);
            K resp = isLost() ? null : reqHandler.apply(req);
            //log.debug("{} resonse {} to {}: {}", acceptor.getAcceptorId(), respAction, proposer.getProposerId(), resp);
            int respLatency = getLatency();
            eventQueue.add(() -> {
                //log.debug("{} receive {} from {}: {}", proposer.getProposerId(), respAction, acceptor.getAcceptorId(), resp);
                respHandler.accept(req, isLost() ? null : resp);
            }, respLatency);
        }, reqLatency);
    }

    public void prepare(Proposer proposer, Acceptor acceptor, PrepareReq req,
                               Function<PrepareReq, PrepareResp> reqHandler,
                               BiConsumer<PrepareReq, PrepareResp> respHandler) {
        asyncCall(proposer, acceptor, req, PREPARE_REQ, reqHandler, PREPARE_RESP, respHandler);
    }

    public void accept(Proposer proposer, Acceptor acceptor, AcceptReq req,
                              Function<AcceptReq, AcceptResp> reqHandler,
                              BiConsumer<AcceptReq, AcceptResp> respHandler) {
        asyncCall(proposer, acceptor, req, ACCEPT_REQ, reqHandler, ACCEPT_RESP, respHandler);
    }
}
